package com.lxk.service.event.handler;

import com.lxk.common.GsonUtils;
import com.lxk.service.component.RedisClient;
import com.lxk.service.event.AbstractEventHandler;
import com.lxk.service.event.domain.EventHandleMethodUnit;
import com.lxk.utils.TraceUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;


import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.*;

import static com.lxk.common.Constant.RedisConstants.EVENT_QUEUE;

/**
 * Created by Lxk on 2020/8/8.
 */
@Slf4j
@Component
public class EventRetryHandler implements ApplicationListener<ContextRefreshedEvent>,ApplicationContextAware{

    private ApplicationContext applicationContext;

    @Resource
    private RedisClient redisClient;

    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue(100);

    Executor taskExecutor = new ThreadPoolExecutor(5,10,10,TimeUnit.SECONDS,blockingQueue);

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    static class RetryWorker implements Runnable{

        private EventHandleMethodUnit unit;

        public RetryWorker(EventHandleMethodUnit unit) {
            this.unit = unit;
        }

        @Override
        public void run() {
            try{
                if(ObjectUtils.isEmpty(unit.getBean())){
                    log.error("EventRetryHandler.run unit is illegal. unit:{}",unit);
                    throw new IllegalStateException("execute bean is null");
                }
                Method[] methodArr = ReflectionUtils.getAllDeclaredMethods(unit.getBean().getClass());
                Optional<Method> methodOp = Arrays.stream(methodArr).filter(each -> unit.getMethodName().equals(each.getName())).findFirst();
                if(methodOp.isPresent()){
                    methodOp.get().setAccessible(true);
                    log.info("paramType:",methodOp.get().getParameterTypes()[0]);
                    methodOp.get().invoke(unit.getBean(), GsonUtils.getGson().fromJson(unit.getEventContext(),((AbstractEventHandler)unit.getBean()).eventType()));
                }else{
                    log.error("未找到执行的方法,bean:{},method:{}",unit.getBean(),unit.getMethodName());
                }
            }catch (Exception e){
                log.error("异常，e：",e);
                log.error("事件重试失败，事件元信息：{}",unit);
            }
        }
    }

    /**
     * 需要加开关，避免特殊情况cpu被打死
     * @param contextStartedEvent
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextStartedEvent) {
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                TraceUtil.genTrace("event_retry_task");
                String eventContext = redisClient.lpop(EVENT_QUEUE);
                log.info("获取事件补偿元信息,unitInfo:{}",eventContext);
                while(!StringUtils.isEmpty(eventContext)){
                    EventHandleMethodUnit unit = GsonUtils.getGson().fromJson(eventContext,EventHandleMethodUnit.class);
                    Object ob = null;
                    try{
                        ob = applicationContext.getBean(unit.getBeanName());
                    }catch (Exception e){
                        log.error("事件处理bean实例获取失败，beanName:{}",unit.getBeanName());
                    }
                    if(!ObjectUtils.isEmpty(ob)){
                        unit.setBean(ob);
                        taskExecutor.execute(new RetryWorker(unit));
                    }else{
                        log.error("事件处理类获取失败，EventHandleMethodUnit：{}",unit);
                    }
                    eventContext = redisClient.lpop(EVENT_QUEUE);
                }
                TraceUtil.cleanTrace();
            }
        },0,25, TimeUnit.SECONDS);
    }
}
